web scraping with python 代码笔记/code notes part1

整理《web scraping with python》书中代码笔记

beautifulsoup

check errors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from urllib.request import urlopen
from urllib.error import HTTPError
from bs4 import BeautifulSoup
def getTitle(url):
try:
html = urlopen(url)
except HTTPError as e:
return None
try:
bsObj = BeautifulSoup(html.read())
title = bsObj.body.h1
except AttributeError as e:
return None
return title
title = getTitle("http://www.pythonscraping.com/pages/page1.html")
if title == None:
print("Title could not be found")
else:
print(title)

find and find_all

1
2
find_all(tag, attributes, recursive, text, limit, keywords)
find(tag, attributes, recursive, text, keywords)

children,sibling,parents

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# children
from urllib.request import urlopen
from bs4 import BeautifulSoup
html = urlopen("http://www.pythonscraping.com/pages/page3.html")
bsObj = BeautifulSoup(html)
for child in bsObj.find("table",{"id":"giftList"}).children:
print(child)
# siblings
from urllib.request import urlopen
from bs4 import BeautifulSoup
html = urlopen("http://www.pythonscraping.com/pages/page3.html")
bsObj = BeautifulSoup(html)
for sibling in bsObj.find("table",{"id":"giftList"}).tr.next_siblings:
print(sibling)
# parents
from urllib.request import urlopen
from bs4 import BeautifulSoup
html = urlopen("http://www.pythonscraping.com/pages/page3.html")
bsObj = BeautifulSoup(html)
print(bsObj.find("img",{"src":"../img/gifts/img1.jpg"
}).parent.previous_sibling.get_text())

regular expressions

1
2
3
4
5
6
7
8
from urllib.request
import urlopenfrom bs4
import BeautifulSoupimport re
html = urlopen("http://www.pythonscraping.com/pages/page3.html")
bsObj = BeautifulSoup(html)
images = bsObj.findAll("img", {"src":re.compile("\.\.\/img\/gifts/img.*\.jpg")})
for image in images:
print(image["src"])

crawling examples

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# wikipedia
from urllib.request import urlopen
from bs4 import BeautifulSoup
html = urlopen("http://en.wikipedia.org/wiki/Kevin_Bacon")
bsObj = BeautifulSoup(html)
for link in bsObj.findAll("a"):
if 'href' in link.attrs:
print(link.attrs['href'])
# regular expressions
from urllib.request import urlopen
from bs4 import BeautifulSoup
import datetime
import random
import re
random.seed(datetime.datetime.now())
def getLinks(articleUrl):
html = urlopen("http://en.wikipedia.org"+articleUrl)
bsObj = BeautifulSoup(html)
return bsObj.find("div", {"id":"bodyContent"}).find_all("a",
href=re.compile("^(/wiki/)((?!:).)*$"))
links = getLinks("/wiki/Kevin_Bacon")
while len(links) > 0:
newArticle = links[random.randint(0, len(links)-1)].attrs["href"]
print(newArticle)
links = getLinks(newArticle)
# Crawling an Entire Site
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
pages = set()
def getLinks(pageUrl):
global pages
html = urlopen("http://en.wikipedia.org"+pageUrl)
bsObj = BeautifulSoup(html)
for link in bsObj.find_all("a", href=re.compile("^(/wiki/)")):
if 'href' in link.attrs:
if link.attrs['href'] not in pages:
#We have encountered a new page
newPage = link.attrs['href']
print(newPage)
pages.add(newPage)
getLinks(newPage)
getLinks("")
# Collecting Data Across an Entire Site
try:
print(bsObj.h1.get_text())
print(bsObj.find(id ="mw-content-text").find_all("p")[0])
print(bsObj.find(id="ca-edit").find("span").find("a").attrs['href'])
except AttributeError:
print("This page is missing something! No worries though!")
for link in bsObj.findAll("a", href=re.compile("^(/wiki/)")):
if 'href' in link.attrs:
if link.attrs['href'] not in pages:
#We have encountered a new page
newPage = link.attrs['href']
print("----------------\n"+newPage)
pages.add(newPage)
getLinks(newPage)
getLinks("")
# Crawling Across the Internet
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import datetime
import random
pages = set()
random.seed(datetime.datetime.now())
#Retrieves a list of all Internal links found on a page
def getInternalLinks(bsObj, includeUrl):
internalLinks = []
#Finds all links that begin with a "/"
for link in bsObj.findAll("a", href=re.compile("^(/|.*"+includeUrl+")")):
if link.attrs['href'] is not None:
if link.attrs['href'] not in internalLinks:
internalLinks.append(link.attrs['href'])
return internalLinks
#Retrieves a list of all external links found on a page
def getExternalLinks(bsObj, excludeUrl):
externalLinks = []
#Finds all links that start with "http" or "www" that do
#not contain the current URL
for link in bsObj.findAll("a",
href=re.compile("^(http|www)((?!"+excludeUrl+").)*$")):
if link.attrs['href'] is not None:
if link.attrs['href'] not in externalLinks:
externalLinks.append(link.attrs['href'])
return externalLinks
def splitAddress(address):
addressParts = address.replace("http://", "").split("/")
return addressParts
def getRandomExternalLink(startingPage):
html = urlopen(startingPage)
bsObj = BeautifulSoup(html)
externalLinks = getExternalLinks(bsObj, splitAddress(startingPage)[0])
if len(externalLinks) == 0:
internalLinks = getInternalLinks(startingPage)
return getNextExternalLink(internalLinks[random.randint(0,
len(internalLinks)-1)])
else:
return externalLinks[random.randint(0, len(externalLinks)-1)]
def followExternalOnly(startingSite):
externalLink = getRandomExternalLink("http://oreilly.com")
print("Random external link is: "+externalLink)
followExternalOnly(externalLink)
followExternalOnly("http://oreilly.com")
#Collects a list of all external URLs found on the site
allExtLinks = set()
allIntLinks = set()
def getAllExternalLinks(siteUrl):
html = urlopen(siteUrl)
bsObj = BeautifulSoup(html)
internalLinks = getInternalLinks(bsObj,splitAddress(siteUrl)[0])
externalLinks = getExternalLinks(bsObj,splitAddress(siteUrl)[0])
for link in externalLinks:
if link not in allExtLinks:
allExtLinks.add(link)
print(link)
for link in internalLinks:
if link not in allIntLinks:
print("About to get link: "+link)
allIntLinks.add(link)
getAllExternalLinks(link)
getAllExternalLinks("http://oreilly.com")

using APIs

1
2
3
4
5
6
# twitter
from twitter import *
t = Twitter(auth=OAuth(<Access Token>, <Access Token Secret>,
<Consumer Key>, <Consumer Secret>))
statusUpdate = t.statuses.update(status='Hello, world!')
print(statusUpdate)

Parsing JSON

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import json
from urllib.request import urlopen
def getCountry(ipAddress):
response = urlopen("http://freegeoip.net/json/"+ipAddress).read()
.decode('utf-8')
responseJson = json.loads(response)
return responseJson.get("country_code")
print(getCountry("50.78.253.58"))
# second example
import json
jsonString = '{"arrayOfNums":[{"number":0},{"number":1},{"number":2}],
"arrayOfFruits":[{"fruit":"apple"},{"fruit":"banana"},
{"fruit":"pear"}]}'
jsonObj = json.loads(jsonString)
print(jsonObj.get("arrayOfNums"))
print(jsonObj.get("arrayOfNums")[1])
print(jsonObj.get("arrayOfNums")[1].get("number")+
jsonObj.get("arrayOfNums")[2].get("number"))
print(jsonObj.get("arrayOfFruits")[2].get("fruit"))
# Bringing It All Back Home
# looks for revision history pages, and then looks for IP addresses on those revision history pages
from urllib.request import urlopen
from bs4 import BeautifulSoup
import datetime
import random
import re
random.seed(datetime.datetime.now())
def getLinks(articleUrl):
html = urlopen("http://en.wikipedia.org"+articleUrl)
bsObj = BeautifulSoup(html)
return bsObj.find("div", {"id":"bodyContent"}).findAll("a",
href=re.compile("^(/wiki/)((?!:).)*$"))
def getHistoryIPs(pageUrl):
#Format of revision history pages is:
#http://en.wikipedia.org/w/index.php?title=Title_in_URL&action=history
pageUrl = pageUrl.replace("/wiki/", "")
historyUrl = "http://en.wikipedia.org/w/index.php?title="
+pageUrl+"&action=history"
print("history url is: "+historyUrl)
html = urlopen(historyUrl)
bsObj = BeautifulSoup(html)
#finds only the links with class "mw-anonuserlink" which has IP addresses
#instead of usernames
ipAddresses = bsObj.findAll("a", {"class":"mw-anonuserlink"})
addressList = set()
for ipAddress in ipAddresses:
addressList.add(ipAddress.get_text())
return addressList
links = getLinks("/wiki/Python_(programming_language)")
while(len(links) > 0):
for link in links:
print("-------------------")
historyIPs = getHistoryIPs(link.attrs["href"])
for historyIP in historyIPs:
print(historyIP)
newLink = links[random.randint(0, len(links)-1)].attrs["href"]
links = getLinks(newLink)
# combine this with the getCountry function from the previous section in order to resolve these IP addresses to countries
def getCountry(ipAddress):
try:
response = urlopen("http://freegeoip.net/json/"
+ipAddress).read().decode('utf-8')
except HTTPError:
return None
responseJson = json.loads(response)
return responseJson.get("country_code")
links = getLinks("/wiki/Python_(programming_language)")
while(len(links) > 0):
for link in links:
print("-------------------")
historyIPs = getHistoryIPs(link.attrs["href"])
for historyIP in historyIPs:
country = getCountry(historyIP)
if country is not None:
print(historyIP+" is from "+country)
newLink = links[random.randint(0, len(links)-1)].attrs["href"]
links = getLinks(newLink)

storing data

csv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import csv
csvFile = open("../files/test.csv", 'w+')
try:
writer = csv.writer(csvFile)
writer.writerow(('number', 'number plus 2', 'number times 2'))
for i in range(10):
writer.writerow( (i, i+2, i*2))
finally:
csvFile.close()
# second example
import csv
from urllib.request import urlopen
from bs4 import BeautifulSoup
html = urlopen("http://en.wikipedia.org/wiki/Comparison_of_text_editors")
bsObj = BeautifulSoup(html)
#The main comparison table is currently the first table on the page
table = bsObj.findAll("table",{"class":"wikitable"})[0]
rows = table.findAll("tr")
csvFile = open("../files/editors.csv", 'wt')
writer = csv.writer(csvFile)
try:
for row in rows:
csvRow = []
for cell in row.findAll(['td', 'th']):
csvRow.append(cell.get_text())
writer.writerow(csvRow)
finally:
csvFile.close()

mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pymysql
conn = pymysql.connect(host='127.0.0.1', unix_socket='/tmp/mysql.sock',
user='root', passwd=None, db='mysql')
cur = conn.cursor()
cur.execute("USE scraping")
cur.execute("SELECT * FROM pages WHERE id=1")
print(cur.fetchone())
cur.close()
conn.close()
# example
from urllib.request import urlopen
from bs4 import BeautifulSoup
import datetime
import random
import pymysql
conn = pymysql.connect(host='127.0.0.1', unix_socket='/tmp/mysql.sock',
user='root', passwd=None, db='mysql', charset='utf8')
cur = conn.cursor()
cur.execute("USE scraping")
random.seed(datetime.datetime.now())
def store(title, content):
cur.execute("INSERT INTO pages (title, content) VALUES (\"%s\",
\"%s\")", (title, content))
cur.connection.commit()
def getLinks(articleUrl):
html = urlopen("http://en.wikipedia.org"+articleUrl)
bsObj = BeautifulSoup(html)
title = bsObj.find("h1").find("span").get_text()
content = bsObj.find("div", {"id":"mw-content-text"}).find("p").get_text()
store(title, content)
return bsObj.find("div", {"id":"bodyContent"}).findAll("a",
href=re.compile("^(/wiki/)((?!:).)*$"))
links = getLinks("/wiki/Kevin_Bacon")
try:
while len(links) > 0:
newArticle = links[random.randint(0, len(links)-1)].attrs["href"]
print(newArticle)
links = getLinks(newArticle)
finally:
cur.close()
conn.close()

Email

1
2
3
4
5
6
7
8
9
import smtplib
from email.mime.text import MIMEText
msg = MIMEText("The body of the email is here")
msg['Subject'] = "An Email Alert"
msg['From'] = "ryan@pythonscraping.com"
msg['To'] = "webmaster@pythonscraping.com"
s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()

Reading Documents

text

1
2
3
4
from urllib.request import urlopen
textPage = urlopen(
"http://www.pythonscraping.com/pages/warandpeace/chapter1-ru.txt")
print(str(textPage.read(), 'utf-8'))

CSV

1
2
3
4
5
6
7
8
9
from urllib.request import urlopen
from io import StringIO
import csv
data = urlopen("http://pythonscraping.com/files/MontyPythonAlbums.csv")
.read().decode('ascii', 'ignore')
dataFile = StringIO(data)
csvReader = csv.reader(dataFile)
for row in csvReader:
print("The album \""+row[0]+"\" was released in "+str(row[1]))

word

1
2
3
4
5
6
7
8
from zipfile import ZipFile
from urllib.request import urlopen
from io import BytesIO
wordFile = urlopen("http://pythonscraping.com/pages/AWordDocument.docx").read()
wordFile = BytesIO(wordFile)
document = ZipFile(wordFile)
xml_content = document.read('word/document.xml')
print(xml_content.decode('utf-8'))

PDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from urllib.request import urlopen
from pdfminer.pdfinterp import PDFResourceManager, process_pdf
from pdfminer.converter import TextConverter
from pdfminer.layout import LAParams
from io import StringIO
from io import open
def readPDF(pdfFile):
rsrcmgr = PDFResourceManager()
retstr = StringIO()
laparams = LAParams()
device = TextConverter(rsrcmgr, retstr, laparams=laparams)
process_pdf(rsrcmgr, device, pdfFile)
device.close()
content = retstr.getvalue()
retstr.close()
return content
pdfFile = urlopen("http://pythonscraping.com/pages/warandpeace/chapter1.pdf");
outputString = readPDF(pdfFile)
print(outputString)
pdfFile.close()